/*
** Zabbix
** Copyright (C) 2001-2021 Zabbix SIA
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
**/

#include "common.h"

#include "daemon.h"
#include "zbxself.h"
#include "log.h"
#include "zbxipcservice.h"
#include "zbxalgo.h"
#include "zbxserver.h"
#include "alerter_protocol.h"
#include "alert_manager.h"
#include "zbxmedia.h"
#include "zbxembed.h"
#include "zbxserialize.h"
#include "zbxalert.h"

#define ZBX_AM_LOCATION_NOWHERE		0
#define ZBX_AM_LOCATION_QUEUE		1

#define ZBX_UPDATE_STR(dst, src)			\
	if (NULL == src)				\
		zbx_free(dst); 				\
	else if (NULL == dst || 0 != strcmp(dst, src)) 	\
		dst = zbx_strdup(dst, src);

#define ZBX_AM_DB_POLL_DELAY	1

#define ALERT_SOURCE_EXTERNAL	0xffff

#define ZBX_ALERTPOOL_SOURCE(id) (id >> 48)
#define ZBX_ALERTPOOL_OBJECT(id) ((id >> 32) & 0xffff)

#define ZBX_AM_MEDIATYPE_FLAG_NONE	0x00
#define ZBX_AM_MEDIATYPE_FLAG_REMOVE	0x01

#define ZBX_DB_PING_FREQUENCY		SEC_PER_MIN

#define ZBX_AM_MEDIATYPE_CLEANUP_FREQUENCY	SEC_PER_HOUR

#define ZBX_ALERT_RESULT_BATCH_SIZE	1000

#define ZBX_MEDIA_CONTENT_TYPE_DEFAULT	255

extern unsigned char	process_type, program_type;
extern int		server_num, process_num;

extern int	CONFIG_ALERTER_FORKS;
extern char	*CONFIG_ALERT_SCRIPTS_PATH;

/*
 * The alert queue is implemented as a nested queue.
 *
 * At the bottom layer is media type queue, enforcing the media type maxsessions setting.
 * At the next layer is alert pool queue. Alert pool is an artificial construct to group
 * alerts generated by the same source (event source, object and objectid) so that they
 * are executed sequentially.
 * At the top layer is alert queue.
 *
 * mediatypes
 *     alertpools
 *         alerts
 *
 * Media type queue is sorted by the timestamp of the miminum item of its alertpool queue.
 * Alert pool queue is sorted by the timestamp of the minimum item of its alerts queue.
 * Alerts queue is sorted by the alert scheduled send timestamp.
 *
 * When taking the next alert to send the following actions are done:
 *    1) take the next media type object from media type queue
am *    2) take the next alert pool object from media type alertpool queue
 *    3) take the next alert from alert pool alerts queue
 *    4) if media type maxsessions limit has not reached, put the media type object back in queue
 *
 * When processing alert response the following actions are done:
 *    1) find alerts media type and alert pool objects
 *    2) cache alert status update to be flushed into database later
 *    3) if alert failed and can be retried put it back into its alert pool queue,
 *       otherwise free the alert object
 *    4) release alert pool object, put it back into media type alertpools queue if the alert pool
 *       was not removed
 *    5) release media type object, put it back into media types queue if the media type object
 *       was not removed
 */

typedef char * zbx_shared_str_t;

/* alert data */
typedef struct
{
	zbx_uint64_t		alertid;
	zbx_uint64_t		mediatypeid;
	zbx_uint64_t		alertpoolid;
	zbx_uint64_t		eventid;
	/* the problem event id for recovery events */
	zbx_uint64_t		p_eventid;
	int			nextsend;

	/* alert data */
	char			*sendto;
	char			*subject;
	zbx_shared_str_t	message;
	char			*params;
	unsigned char		content_type;
	int			status;
	int			retries;

	int			objectid;
}
zbx_am_alert_t;

/* Alert pool data.                                                          */
/* Alerts are assigned to pools based on event source, object and objectid.  */
/* While alert pools can be processed in parallel, alerts inside alert pool  */
/* are processed sequentially.                                               */
typedef struct
{
	zbx_uint64_t		id;
	zbx_uint64_t		mediatypeid;

	/* alert queue */
	zbx_binary_heap_t	queue;

	int			location;

	/* the number of currently processing alerts */
	int			alerts_num;

	/* the number of alert objects for this alert pool */
	int			refcount;
}
zbx_am_alertpool_t;

/* report data */
typedef struct
{
	char			*subject;
	char			*message;
	char			*content;
	char			*content_name;
	char			*content_type;
	zbx_uint32_t		content_size;
}
zbx_am_dispatch_t;

/* alerter data */
typedef struct
{
	/* the connected aleter client */
	zbx_ipc_client_t	*client;

	zbx_am_alert_t		*alert;
}
zbx_am_alerter_t;


/* alert manager data */
typedef struct
{
	/* number of queued alerts */
	zbx_uint64_t		alerts_num;

	/* alerter vector, created during manager initialization */
	zbx_vector_ptr_t	alerters;
	zbx_queue_ptr_t		free_alerters;

	/* alerters indexed by IPC service clients */
	zbx_hashset_t		alerters_client;

	/* the next alerter index to be assigned to new IPC service clients */
	int			next_alerter_index;

	zbx_hashset_t		mediatypes;
	zbx_hashset_t		alertpools;

	/* the alert status update cache */
	zbx_hashset_t		results;

	/* the watchdog alert recipients */
	zbx_hashset_t		watchdog;

	/* mediatype queue */
	zbx_binary_heap_t	queue;

	/* the database status */
	int			dbstatus;

	/* the scripting engine */
	zbx_es_t		es;

	/* the IPC service */
	zbx_ipc_service_t	ipc;
}
zbx_am_t;


/* alerters client index hashset support */

static zbx_hash_t	alerter_hash_func(const void *d)
{
	const zbx_am_alerter_t	*alerter = *(const zbx_am_alerter_t **)d;

	zbx_hash_t hash = ZBX_DEFAULT_PTR_HASH_FUNC(&alerter->client);

	return hash;
}

static int	alerter_compare_func(const void *d1, const void *d2)
{
	const zbx_am_alerter_t	*p1 = *(const zbx_am_alerter_t **)d1;
	const zbx_am_alerter_t	*p2 = *(const zbx_am_alerter_t **)d2;

	ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client);

	return 0;
}

/* alert pool hashset support */

static zbx_hash_t	am_alertpool_hash_func(const void *data)
{
	const zbx_am_alertpool_t	*pool = (const zbx_am_alertpool_t *)data;

	zbx_hash_t			hash;

	hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&pool->id);
	hash = ZBX_DEFAULT_UINT64_HASH_ALGO(&pool->mediatypeid, sizeof(pool->mediatypeid), hash);

	return hash;
}

static int	am_alertpool_compare_func(const void *d1, const void *d2)
{
	const zbx_am_alertpool_t	*pool1 = (const zbx_am_alertpool_t *)d1;
	const zbx_am_alertpool_t	*pool2 = (const zbx_am_alertpool_t *)d2;

	ZBX_RETURN_IF_NOT_EQUAL(pool1->id, pool2->id);
	ZBX_RETURN_IF_NOT_EQUAL(pool1->mediatypeid, pool2->mediatypeid);

	return 0;
}

/* queue support */

static int	am_alert_compare(const zbx_am_alert_t *alert1, const zbx_am_alert_t *alert2)
{
	ZBX_RETURN_IF_NOT_EQUAL(alert1->nextsend, alert2->nextsend);
	ZBX_RETURN_IF_NOT_EQUAL(alert1->alertid, alert2->alertid);

	return 0;
}

static int	am_alert_queue_compare(const void *d1, const void *d2)
{
	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;

	return am_alert_compare((const zbx_am_alert_t *)e1->data, (const zbx_am_alert_t *)e2->data);
}

static int	am_alertpool_compare(const zbx_am_alertpool_t *pool1, const zbx_am_alertpool_t *pool2)
{
	zbx_binary_heap_elem_t	*e1, *e2;

	e1 = zbx_binary_heap_find_min((zbx_binary_heap_t *)&pool1->queue);
	e2 = zbx_binary_heap_find_min((zbx_binary_heap_t *)&pool2->queue);

	return am_alert_compare((const zbx_am_alert_t *)e1->data, (const zbx_am_alert_t *)e2->data);
}

static int	am_alertpool_queue_compare(const void *d1, const void *d2)
{
	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;

	return am_alertpool_compare((const zbx_am_alertpool_t *)e1->data, (const zbx_am_alertpool_t *)e2->data);
}

static int	am_mediatype_compare(const zbx_am_mediatype_t *media1, const zbx_am_mediatype_t *media2)
{
	zbx_binary_heap_elem_t	*e1, *e2;

	e1 = zbx_binary_heap_find_min((zbx_binary_heap_t *)&media1->queue);
	e2 = zbx_binary_heap_find_min((zbx_binary_heap_t *)&media2->queue);

	return am_alertpool_compare((const zbx_am_alertpool_t *)e1->data, (const zbx_am_alertpool_t *)e2->data);
}

static int	am_mediatype_queue_compare(const void *d1, const void *d2)
{
	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;

	return am_mediatype_compare((const zbx_am_mediatype_t *)e1->data, (const zbx_am_mediatype_t *)e2->data);
}

/******************************************************************************
 *                                                                            *
 * reference counted strings                                                  *
 *                                                                            *
 ******************************************************************************/

static zbx_shared_str_t	shared_str_new(const char *src)
{
	size_t	len;
	char	*ptr;

	if (NULL == src)
		return NULL;

	len = strlen(src);
	ptr = zbx_malloc(NULL, len + sizeof(zbx_uint32_t) + 1);
	*((zbx_uint32_t *)ptr) = 0;
	memcpy(ptr + sizeof(zbx_uint32_t), src, len + 1);

	return ptr + 4;
}

static zbx_shared_str_t 	shared_str_addref(zbx_shared_str_t str)
{
	if (NULL != str)
	{
		zbx_uint32_t	*refcount = (zbx_uint32_t *)(str - sizeof(zbx_uint32_t));
		(*refcount)++;
	}
	return str;
}

static void	shared_str_release(zbx_shared_str_t str)
{
	if (NULL != str)
	{
		zbx_uint32_t	*refcount = (zbx_uint32_t *)(str - sizeof(zbx_uint32_t));

		if (0 == --(*refcount))
			zbx_free(refcount);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: am_dispatch_free                                                 *
 *                                                                            *
 ******************************************************************************/
static void	am_dispatch_free(zbx_am_dispatch_t *dispatch)
{
	zbx_free(dispatch->subject);
	zbx_free(dispatch->message);
	zbx_free(dispatch->content);
	zbx_free(dispatch->content_name);
	zbx_free(dispatch->content_type);
	zbx_free(dispatch);
}


/******************************************************************************
 *                                                                            *
 * Function: am_get_mediatype                                                 *
 *                                                                            *
 * Purpose: gets media type object                                            *
 *                                                                            *
 * Parameters: manager     - [IN] the alert manager                           *
 *             mediatypeid - [IN] the media type identifier                   *
 *                                                                            *
 * Return value: The media type object or NULL if not found                   *
 *                                                                            *
 ******************************************************************************/
static zbx_am_mediatype_t	*am_get_mediatype(zbx_am_t *manager, zbx_uint64_t mediatypeid)
{
	return (zbx_am_mediatype_t *)zbx_hashset_search(&manager->mediatypes, &mediatypeid);
}

/******************************************************************************
 *                                                                            *
 * Function: zbx_am_update_webhook                                            *
 *                                                                            *
 * Purpose: updates additional webhook media type fields                      *
 *                                                                            *
 ******************************************************************************/
static void	zbx_am_update_webhook(zbx_am_t *manager, zbx_am_mediatype_t *mediatype, const char *script,
		const char *timeout)
{
	if (FAIL == is_time_suffix(timeout, &mediatype->timeout, ZBX_LENGTH_UNLIMITED))
	{
		mediatype->error = zbx_strdup(mediatype->error, "Invalid timeout value in media type configuration.");
		return;
	}

	if (NULL == mediatype->script || 0 != strcmp(mediatype->script, script))
	{
		if (SUCCEED != zbx_es_is_env_initialized(&manager->es))
		{
			if (SUCCEED != zbx_es_init_env(&manager->es, &mediatype->error))
				return;
		}

		zbx_free(mediatype->script_bin);
		if (SUCCEED != zbx_es_compile(&manager->es, script, &mediatype->script_bin, &mediatype->script_bin_sz,
				&mediatype->error))
		{
			return;
		}
		mediatype->script = zbx_strdup(mediatype->script, script);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: am_update_mediatype                                              *
 *                                                                            *
 * Purpose: updates media type object, creating one if necessary              *
 *                                                                            *
 * Parameters: manager     - [IN] the alert manager                           *
 *             ...         - [IN] media type properties                       *
 *                                                                            *
 ******************************************************************************/
static void	am_update_mediatype(zbx_am_t *manager, zbx_uint64_t mediatypeid, unsigned char type,
		const char *smtp_server, const char *smtp_helo, const char *smtp_email,
		const char *exec_path, const char *gsm_modem, const char *username, const char *passwd,
		unsigned short smtp_port, unsigned char smtp_security, unsigned char smtp_verify_peer,
		unsigned char smtp_verify_host, unsigned char smtp_authentication, const char *exec_params,
		int maxsessions, int maxattempts, const char *attempt_interval, unsigned char content_type,
		const char *script, const char *timeout, unsigned char flags)
{
	zbx_am_mediatype_t	*mediatype;

	if (NULL == (mediatype = am_get_mediatype(manager, mediatypeid)))
	{
		zbx_am_mediatype_t	mediatype_local = {
				.mediatypeid = mediatypeid,
				.location = ZBX_AM_LOCATION_NOWHERE,
				.flags = flags
		};

		mediatype = (zbx_am_mediatype_t *)zbx_hashset_insert(&manager->mediatypes, &mediatype_local,
				sizeof(mediatype_local));

		zbx_binary_heap_create(&mediatype->queue, am_alertpool_queue_compare,
				ZBX_BINARY_HEAP_OPTION_DIRECT);
	}
	else
	{
		/* reset remove flag if normal media type is being added */
		if (ZBX_AM_MEDIATYPE_FLAG_NONE == flags)
			mediatype->flags = ZBX_AM_MEDIATYPE_FLAG_NONE;
	}

	mediatype->type = type;

	zbx_free(mediatype->error);
	ZBX_UPDATE_STR(mediatype->smtp_server, smtp_server);
	ZBX_UPDATE_STR(mediatype->smtp_helo, smtp_helo);
	ZBX_UPDATE_STR(mediatype->smtp_email, smtp_email);
	ZBX_UPDATE_STR(mediatype->exec_path, exec_path);
	ZBX_UPDATE_STR(mediatype->exec_params, exec_params);
	ZBX_UPDATE_STR(mediatype->gsm_modem, gsm_modem);
	ZBX_UPDATE_STR(mediatype->username, username);
	ZBX_UPDATE_STR(mediatype->passwd, passwd);

	mediatype->smtp_port = smtp_port;
	mediatype->smtp_security = smtp_security;
	mediatype->smtp_verify_peer = smtp_verify_peer;
	mediatype->smtp_verify_host = smtp_verify_host;
	mediatype->smtp_authentication = smtp_authentication;

	mediatype->maxsessions = maxsessions;
	mediatype->maxattempts = maxattempts;
	mediatype->content_type = content_type;

	if (FAIL == is_time_suffix(attempt_interval, &mediatype->attempt_interval, ZBX_LENGTH_UNLIMITED))
	{
		mediatype->error = zbx_strdup(mediatype->error, "Invalid media type attempt interval.");
		return;
	}

	if (MEDIA_TYPE_WEBHOOK == mediatype->type)
		zbx_am_update_webhook(manager, mediatype, script, timeout);
}

/******************************************************************************
 *                                                                            *
 * Function: am_push_mediatype                                                *
 *                                                                            *
 * Purpose: pushes media type into manager media type queue                   *
 *                                                                            *
 * Parameters: manager   - [IN] the alert manager                             *
 *             mediatype - [IN] the media type                                *
 *                                                                            *
 * Comments: The media tyep is inserted into queue only if it was not already *
 *           queued and if the number of media type alerts being processed    *
 *           not reached the limit.                                           *
 *           If media type is already queued only its location in the queue   *
 *           is updated.                                                      *
 *                                                                            *
 ******************************************************************************/
static void	am_push_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype)
{
	zbx_binary_heap_elem_t	elem = {mediatype->mediatypeid, mediatype};

	if (SUCCEED == zbx_binary_heap_empty(&mediatype->queue))
		return;

	if (ZBX_AM_LOCATION_NOWHERE == mediatype->location)
	{
		if (0 == mediatype->maxsessions || mediatype->alerts_num < mediatype->maxsessions)
		{
			zbx_binary_heap_insert(&manager->queue, &elem);
			mediatype->location = ZBX_AM_LOCATION_QUEUE;
		}
	}
	else
		zbx_binary_heap_update_direct(&manager->queue, &elem);
}

/******************************************************************************
 *                                                                            *
 * Function: am_pop_mediatype                                                 *
 *                                                                            *
 * Purpose: gets the next media type from queue                               *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *                                                                            *
 * Return value: The media type object.                                       *
 *                                                                            *
 ******************************************************************************/
static zbx_am_mediatype_t	*am_pop_mediatype(zbx_am_t *manager)
{
	zbx_binary_heap_elem_t	*elem;
	zbx_am_mediatype_t	*mediatype;

	if (FAIL != zbx_binary_heap_empty(&manager->queue))
		return NULL;

	elem = zbx_binary_heap_find_min(&manager->queue);
	mediatype = (zbx_am_mediatype_t *)elem->data;
	mediatype->location = ZBX_AM_LOCATION_NOWHERE;

	zbx_binary_heap_remove_min(&manager->queue);

	return mediatype;
}

/******************************************************************************
 *                                                                            *
 * Function: am_remove_mediatype                                              *
 *                                                                            *
 ******************************************************************************/
static void am_remove_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype)
{
	zabbix_log(LOG_LEVEL_DEBUG, "%s() mediatypeid:" ZBX_FS_UI64, __func__, mediatype->mediatypeid);

	zbx_free(mediatype->smtp_server);
	zbx_free(mediatype->smtp_helo);
	zbx_free(mediatype->smtp_email);
	zbx_free(mediatype->exec_path);
	zbx_free(mediatype->exec_params);
	zbx_free(mediatype->gsm_modem);
	zbx_free(mediatype->username);
	zbx_free(mediatype->passwd);
	zbx_free(mediatype->script);
	zbx_free(mediatype->script_bin);
	zbx_free(mediatype->error);

	zbx_binary_heap_destroy(&mediatype->queue);
	zbx_hashset_remove_direct(&manager->mediatypes, mediatype);
}

/******************************************************************************
 *                                                                            *
 * Function: am_release_mediatype                                             *
 *                                                                            *
 ******************************************************************************/
static int	am_release_mediatype(zbx_am_t *manager, zbx_am_mediatype_t *mediatype)
{
	if (0 != --mediatype->refcount)
		return FAIL;

	if (0 != (mediatype->flags & ZBX_AM_MEDIATYPE_FLAG_REMOVE))
		am_remove_mediatype(manager, mediatype);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Function: am_calc_alertpoolid                                              *
 *                                                                            *
 * Purpose: calculate alert pool id from event source, object and objectid    *
 *                                                                            *
 * Parameters: source   - [IN] the event source                               *
 *             object   - [IN] the event object type                          *
 *             objectid - [IN] the event objectid                             *
 *                                                                            *
 * Return value: The alert pool id.                                           *
 *                                                                            *
 ******************************************************************************/
static zbx_uint64_t	am_calc_alertpoolid(int source, int object, zbx_uint64_t objectid)
{
	zbx_uint64_t	alertpoolid;

	if (source < 0 || source > 0xffff)
		THIS_SHOULD_NEVER_HAPPEN;

	if (object < 0 || object > 0xffff)
		THIS_SHOULD_NEVER_HAPPEN;

	alertpoolid = source & 0xffff;
	alertpoolid <<= 16;
	alertpoolid |= object & 0xffff;
	alertpoolid <<= 32;
	alertpoolid |= ZBX_DEFAULT_UINT64_HASH_FUNC(&objectid);

	return alertpoolid;
}

/******************************************************************************
 *                                                                            *
 * Function: am_get_alertpool                                                 *
 *                                                                            *
 * Purpose: gets alert pool object, creating one if the object with specified *
 *          identifiers was not found                                         *
 *                                                                            *
 * Parameters: manager     - [IN] the alert manager                           *
 *             mediatypeid - [IN] the media type identifier                   *
 *             alertpoolid - [IN] the alert pool identifier                   *
 *                                                                            *
 * Return value: The alert pool object.                                       *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alertpool_t	*am_get_alertpool(zbx_am_t *manager, zbx_uint64_t mediatypeid, zbx_uint64_t alertpoolid)
{
	zbx_am_alertpool_t	*alertpool, alertpool_local;

	alertpool_local.mediatypeid = mediatypeid;
	alertpool_local.id = alertpoolid;

	if (NULL == (alertpool = (zbx_am_alertpool_t *)zbx_hashset_search(&manager->alertpools, &alertpool_local)))
	{
		alertpool = (zbx_am_alertpool_t *)zbx_hashset_insert(&manager->alertpools, &alertpool_local,
				sizeof(alertpool_local));

		zbx_binary_heap_create(&alertpool->queue, am_alert_queue_compare, ZBX_BINARY_HEAP_OPTION_EMPTY);

		alertpool->location = ZBX_AM_LOCATION_NOWHERE;
		alertpool->refcount = 0;
		alertpool->alerts_num = 0;
	}

	return alertpool;
}

/******************************************************************************
 *                                                                            *
 * Function: am_push_alertpool                                                *
 *                                                                            *
 * Purpose: pushes alert pool into media type alert pool queue                *
 *                                                                            *
 * Parameters: mediatype - [IN] the media type                                *
 *             alertpool - [IN] the alert pool                                *
 *                                                                            *
 * Comments: The alert pool is inserted into queue only if it was not already *
 *           queued. Otherwise its position in the queue is updated.          *
 *                                                                            *
 ******************************************************************************/
static void	am_push_alertpool(zbx_am_mediatype_t *mediatype, zbx_am_alertpool_t *alertpool)
{
	zbx_binary_heap_elem_t	elem = {alertpool->id, alertpool};

	if (ZBX_AM_LOCATION_NOWHERE == alertpool->location)
	{
		if (0 == alertpool->alerts_num)
		{
			zbx_binary_heap_insert(&mediatype->queue, &elem);
			alertpool->location = ZBX_AM_LOCATION_QUEUE;
		}
	}
	else
		zbx_binary_heap_update_direct(&mediatype->queue, &elem);
}

/******************************************************************************
 *                                                                            *
 * Function: am_pop_alertpool                                                 *
 *                                                                            *
 * Purpose: gets the next alert pool from queue                               *
 *                                                                            *
 * Parameters: mediatype - [IN] the media type                                *
 *                                                                            *
 * Return value: The alert pool object.                                       *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alertpool_t	*am_pop_alertpool(zbx_am_mediatype_t *mediatype)
{
	zbx_binary_heap_elem_t	*elem;
	zbx_am_alertpool_t	*alertpool;

	if (FAIL != zbx_binary_heap_empty(&mediatype->queue))
		return NULL;

	elem = zbx_binary_heap_find_min(&mediatype->queue);
	alertpool = (zbx_am_alertpool_t *)elem->data;
	alertpool->location = ZBX_AM_LOCATION_NOWHERE;

	zbx_binary_heap_remove_min(&mediatype->queue);

	return alertpool;
}

/******************************************************************************
 *                                                                            *
 * Function: am_release_alertpool                                             *
 *                                                                            *
 * Purpose: removes alert pool                                                *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *             alert   - [IN] the alert pool                                  *
 *                                                                            *
 * Return value: SUCCEED - the object was removed                             *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	am_release_alertpool(zbx_am_t *manager, zbx_am_alertpool_t *alertpool)
{

	if (0 != -- alertpool->refcount)
		return FAIL;

	zbx_binary_heap_destroy(&alertpool->queue);
	zbx_hashset_remove_direct(&manager->alertpools, alertpool);

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Function: am_create_alert                                                  *
 *                                                                            *
 * Purpose: creates new alert object                                          *
 *                                                                            *
 * Parameters: ...           - [IN] alert data                                *
 *                                                                            *
 * Return value: The alert object.                                            *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alert_t	*am_create_alert(zbx_uint64_t alertid, zbx_uint64_t mediatypeid, int source, int object,
		zbx_uint64_t objectid, const char *sendto, const char *subject, zbx_shared_str_t message,
		const char *params, unsigned char content_type, int status, int retries, int nextsend)
{
	zbx_am_alert_t	*alert;

	alert = (zbx_am_alert_t *)zbx_malloc(NULL, sizeof(zbx_am_alert_t));
	alert->alertid = alertid;
	alert->mediatypeid = mediatypeid;
	alert->alertpoolid = am_calc_alertpoolid(source, object, objectid);
	alert->objectid = objectid;
	alert->content_type = content_type;
	alert->eventid = 0;
	alert->p_eventid = 0;

	if (NULL != sendto)
		alert->sendto = zbx_strdup(NULL, sendto);
	else
		alert->sendto = NULL;

	if (NULL != subject)
		alert->subject = zbx_strdup(NULL, subject);
	else
		alert->subject = NULL;

	alert->message = shared_str_addref(message);

	if (NULL != params)
		alert->params = zbx_strdup(NULL, params);
	else
		alert->params = NULL;

	alert->status = status;
	alert->retries = retries;
	alert->nextsend = nextsend;

	return alert;
}

/******************************************************************************
 *                                                                            *
 * Function: am_copy_db_alert                                                 *
 *                                                                            *
 * Purpose: creates new alert object from db alert                            *
 *                                                                            *
 * Parameters: db_alert - [IN] the db alert object                            *
 *                                                                            *
 * Return value: The alert object.                                            *
 *                                                                            *
 * Comments: The db_alert is destroyed during copying process and should not  *
 *           be accessed/freed afterwards.                                    *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alert_t	*am_copy_db_alert(zbx_am_db_alert_t *db_alert)
{
	zbx_am_alert_t	*alert;

	alert = (zbx_am_alert_t *)zbx_malloc(NULL, sizeof(zbx_am_alert_t));
	alert->alertid = db_alert->alertid;
	alert->mediatypeid = db_alert->mediatypeid;
	alert->alertpoolid = am_calc_alertpoolid(db_alert->source, db_alert->object, db_alert->objectid);
	alert->objectid = db_alert->objectid;
	alert->eventid = db_alert->eventid;
	alert->p_eventid = db_alert->p_eventid;
	alert->content_type = ZBX_MEDIA_CONTENT_TYPE_DEFAULT;

	alert->sendto = db_alert->sendto;
	alert->subject = db_alert->subject;

	alert->message = shared_str_addref(shared_str_new(db_alert->message));
	zbx_free(db_alert->message);

	alert->params = db_alert->params;

	alert->status = db_alert->status;
	alert->retries = db_alert->retries;
	alert->nextsend = 0;

	zbx_free(db_alert);

	return alert;
}

/******************************************************************************
 *                                                                            *
 * Function: am_alert_free                                                    *
 *                                                                            *
 * Purpose: frees the alert object                                            *
 *                                                                            *
 * Parameters: alert - [IN] the alert object                                  *
 *                                                                            *
 ******************************************************************************/
static void	am_alert_free(zbx_am_alert_t *alert)
{
	zbx_free(alert->sendto);
	zbx_free(alert->subject);
	shared_str_release(alert->message);
	zbx_free(alert->params);
	zbx_free(alert);
}

/******************************************************************************
 *                                                                            *
 * Function: am_push_alert                                                    *
 *                                                                            *
 * Purpose: pushes alert into alert pool alert queue                          *
 *                                                                            *
 * Parameters: alertpool - [IN] the alert pool                                *
 *             alert     - [IN] the alert                                     *
 *                                                                            *
 ******************************************************************************/
static void	am_push_alert(zbx_am_alertpool_t *alertpool, zbx_am_alert_t *alert)
{
	zbx_binary_heap_elem_t	elem = {0, alert};

	zbx_binary_heap_insert(&alertpool->queue, &elem);
}

/******************************************************************************
 *                                                                            *
 * Function: am_pop_alert                                                     *
 *                                                                            *
 * Purpose: gets the next alert from queue                                    *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *                                                                            *
 * Return value: The alert object.                                            *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alert_t	*am_pop_alert(zbx_am_t *manager)
{
	zbx_am_mediatype_t	*mediatype;
	zbx_am_alertpool_t	*alertpool;
	zbx_am_alert_t		*alert;
	zbx_binary_heap_elem_t	*elem;

	if (NULL == (mediatype = am_pop_mediatype(manager)))
		return NULL;

	alertpool = am_pop_alertpool(mediatype);

	elem = zbx_binary_heap_find_min(&alertpool->queue);
	alert = (zbx_am_alert_t *)elem->data;
	zbx_binary_heap_remove_min(&alertpool->queue);

	/* requeue media type if the number of parallel alerts has not yet reached */
	mediatype->alerts_num++;
	alertpool->alerts_num++;
	if (0 == mediatype->maxsessions || mediatype->alerts_num < mediatype->maxsessions)
		am_push_mediatype(manager, mediatype);

	return alert;
}

/******************************************************************************
 *                                                                            *
 * Function: am_remove_alert                                                  *
 *                                                                            *
 * Purpose: removes alert and requeues associated alert pool and media type   *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *             alert   - [IN] the alert                                       *
 *                                                                            *
 ******************************************************************************/
static void	am_remove_alert(zbx_am_t *manager, zbx_am_alert_t *alert)
{
	zbx_am_alertpool_t	*alertpool;
	zbx_am_mediatype_t	*mediatype;

	if (NULL != (mediatype = am_get_mediatype(manager, alert->mediatypeid)))
	{
		mediatype->alerts_num--;

		if (NULL != (alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid)))
		{
			alertpool->alerts_num--;
			if (SUCCEED != am_release_alertpool(manager, alertpool))
				am_push_alertpool(mediatype, alertpool);
		}

		if (SUCCEED != am_release_mediatype(manager, mediatype))
			am_push_mediatype(manager, mediatype);
	}

	am_alert_free(alert);

	manager->alerts_num--;
}

/******************************************************************************
 *                                                                            *
 * Function: am_retry_alert                                                   *
 *                                                                            *
 * Purpose: retries alert if there are attempts left or removes it            *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *             alert   - [IN] the alert                                       *
 *                                                                            *
 * Return value: SUCCEED - the alert was queued to be sent again              *
 *               FAIL - the alert retries value exceeded the mediatype        *
 *                      maxattempts limit and alert was removed as failed.    *
 *                                                                            *
 ******************************************************************************/
static int	am_retry_alert(zbx_am_t *manager, zbx_am_alert_t *alert)
{
	zbx_am_alertpool_t	*alertpool;
	zbx_am_mediatype_t	*mediatype;
	int			ret = FAIL;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() alertid:" ZBX_FS_UI64, __func__, alert->alertid);

	if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid)))
		goto out;

	if (++alert->retries >= mediatype->maxattempts)
		goto out;

	alert->nextsend = time(NULL) + mediatype->attempt_interval;

	alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid);

	mediatype->alerts_num--;
	alertpool->alerts_num--;

	am_push_alert(alertpool, alert);
	am_push_alertpool(mediatype, alertpool);
	am_push_mediatype(manager, mediatype);

	ret = SUCCEED;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: am_alerter_free                                                  *
 *                                                                            *
 * Purpose: frees alerter                                                     *
 *                                                                            *
 ******************************************************************************/
static void	am_alerter_free(zbx_am_alerter_t *alerter)
{
	zbx_ipc_client_close(alerter->client);
	zbx_free(alerter);
}

/******************************************************************************
 *                                                                            *
 * Function: am_register_alerter                                              *
 *                                                                            *
 * Purpose: registers alerter                                                 *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected alerter                           *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_register_alerter(zbx_am_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	zbx_am_alerter_t	*alerter = NULL;	/* if 'alerter' type changes do not forget to change sizeof() */
							/* (see comment below) */
	pid_t			ppid;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	memcpy(&ppid, message->data, sizeof(ppid));

	if (ppid != getppid())
	{
		zbx_ipc_client_close(client);
		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
	}
	else
	{
		if (manager->next_alerter_index == manager->alerters.values_num)
		{
			THIS_SHOULD_NEVER_HAPPEN;
			exit(EXIT_FAILURE);
		}

		alerter = (zbx_am_alerter_t *)manager->alerters.values[manager->next_alerter_index++];
		alerter->client = client;

		/* sizeof(zbx_am_alerter_t *) in the following line returns size of 'alerter' pointer. */
		/* sizeof(alerter) is not used to avoid analyzer warning */
		zbx_hashset_insert(&manager->alerters_client, &alerter, sizeof(zbx_am_alerter_t *));
		zbx_queue_ptr_push(&manager->free_alerters, alerter);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_get_alerter_by_client                                         *
 *                                                                            *
 * Purpose: returns alerter by connected client                               *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected alerter                           *
 *                                                                            *
 * Return value: The alerter                                                  *
 *                                                                            *
 ******************************************************************************/
static zbx_am_alerter_t	*am_get_alerter_by_client(zbx_am_t *manager, zbx_ipc_client_t *client)
{
	zbx_am_alerter_t	**alerter, alerter_local, *plocal = &alerter_local;

	plocal->client = client;

	alerter = (zbx_am_alerter_t **)zbx_hashset_search(&manager->alerters_client, &plocal);

	if (NULL == alerter)
	{
		THIS_SHOULD_NEVER_HAPPEN;
		exit(EXIT_FAILURE);
	}

	return *alerter;
}

#if defined(HAVE_MYSQL)
#	define ZBX_DATABASE_TYPE "MySQL"
#elif defined(HAVE_ORACLE)
#	define ZBX_DATABASE_TYPE "Oracle"
#elif defined(HAVE_POSTGRESQL)
#	define ZBX_DATABASE_TYPE "PostgreSQL"
#endif

/******************************************************************************
 *                                                                            *
 * Function: am_create_db_alert_message                                       *
 *                                                                            *
 * Purpose: get and format error message from database when it is unavailable *
 *                                                                            *
 * Return value: full database error message is allocated                     *
 *                                                                            *
 ******************************************************************************/
static char	*am_create_db_alert_message(void)
{
	const char	*error;
	char		*alert_message = NULL;
	size_t		alert_message_alloc = 0, alert_message_offset = 0;

	zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, "%s database \"%s\"",
			ZBX_DATABASE_TYPE, CONFIG_DBNAME);

	if ('\0' != *CONFIG_DBHOST)
	{
		zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, " on \"%s",
				CONFIG_DBHOST);

		if (0 != CONFIG_DBPORT)
		{
			zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, ":%d\"",
					CONFIG_DBPORT);
		}
		else
			zbx_chrcpy_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, '\"');
	}

	zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, " is not available");

	if (NULL != (error = zbx_db_last_strerr()) && '\0' != *error)
		zbx_snprintf_alloc(&alert_message, &alert_message_alloc, &alert_message_offset, ": %s", error);

	return alert_message;
}

#undef ZBX_DATABASE_TYPE

/******************************************************************************
 *                                                                            *
 * Function: am_queue_watchdog_alerts                                         *
 *                                                                            *
 * Purpose: queues 'database down' watchdog alerts                            *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *                                                                            *
 ******************************************************************************/
static void	am_queue_watchdog_alerts(zbx_am_t *manager)
{
	zbx_am_media_t		*media;
	zbx_am_mediatype_t	*mediatype;
	zbx_am_alertpool_t	*alertpool;
	zbx_am_alert_t		*alert;
	zbx_hashset_iter_t	iter;
	const char		*alert_subject = "Zabbix database is not available.";
	char			*alert_message;

	zabbix_log(LOG_LEVEL_DEBUG, "%s() recipients:%d", __func__, manager->watchdog.num_data);

	zbx_hashset_iter_reset(&manager->watchdog, &iter);
	while (NULL != (media = (zbx_am_media_t *)zbx_hashset_iter_next(&iter)))
	{
		if (NULL == (mediatype = am_get_mediatype(manager, media->mediatypeid)))
		{
			zabbix_log(LOG_LEVEL_DEBUG, "cannot find media type with id " ZBX_FS_UI64, media->mediatypeid);
			continue;
		}

		mediatype->refcount++;

		alert_message = am_create_db_alert_message();

		if (ZBX_MEDIA_CONTENT_TYPE_HTML == mediatype->content_type)
		{
			char	*am_esc;

			am_esc = xml_escape_dyn(alert_message);
			alert_message = zbx_dsprintf(alert_message, "<html><pre>%s</pre></html>", am_esc);
			zbx_free(am_esc);
		}

		alert = am_create_alert(0, media->mediatypeid, 0, 0, 0, media->sendto, alert_subject,
				shared_str_new(alert_message), NULL, mediatype->content_type, 0, 0, 0);

		alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid);
		alertpool->refcount++;

		am_push_alert(alertpool, alert);
		am_push_alertpool(mediatype, alertpool);
		am_push_mediatype(manager, mediatype);

		zbx_free(alert_message);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: am_init                                                          *
 *                                                                            *
 * Purpose: initializes alert manager                                         *
 *                                                                            *
 * Parameters: manager - [IN] the manager to initialize                       *
 *                                                                            *
 ******************************************************************************/
static int	am_init(zbx_am_t *manager, char **error)
{
	int			i, ret;
	zbx_am_alerter_t	*alerter;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() alerters:%d", __func__, CONFIG_ALERTER_FORKS);

	if (FAIL == (ret = zbx_ipc_service_start(&manager->ipc, ZBX_IPC_SERVICE_ALERTER, error)))
		goto out;

	manager->alerts_num = 0;
	zbx_vector_ptr_create(&manager->alerters);
	zbx_queue_ptr_create(&manager->free_alerters);
	zbx_hashset_create(&manager->alerters_client, 0, alerter_hash_func, alerter_compare_func);

	manager->next_alerter_index = 0;

	for (i = 0; i < CONFIG_ALERTER_FORKS; i++)
	{
		alerter = (zbx_am_alerter_t *)zbx_malloc(NULL, sizeof(zbx_am_alerter_t));

		alerter->client = NULL;

		zbx_vector_ptr_append(&manager->alerters, alerter);
	}

	zbx_hashset_create(&manager->mediatypes, 5, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_hashset_create(&manager->alertpools, 100, am_alertpool_hash_func, am_alertpool_compare_func);
	zbx_hashset_create(&manager->results, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_hashset_create(&manager->watchdog, 5, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_binary_heap_create(&manager->queue, am_mediatype_queue_compare, ZBX_BINARY_HEAP_OPTION_DIRECT);

	zbx_es_init(&manager->es);
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: am_destroy                                                       *
 *                                                                            *
 * Purpose: destroys alert manager                                            *
 *                                                                            *
 * Parameters: manager - [IN] the manager to destroy                          *
 *                                                                            *
 ******************************************************************************/
static void	am_destroy(zbx_am_t *manager)
{
	zbx_am_alert_t		*alert;
	zbx_hashset_iter_t	iter;
	zbx_am_media_t		*media;

	zbx_es_destroy(&manager->es);

	zbx_hashset_destroy(&manager->alerters_client);
	zbx_queue_ptr_destroy(&manager->free_alerters);
	zbx_vector_ptr_clear_ext(&manager->alerters, (zbx_mem_free_func_t)am_alerter_free);
	zbx_vector_ptr_destroy(&manager->alerters);

	while (NULL != (alert = am_pop_alert(manager)))
		am_remove_alert(manager, alert);

	zbx_binary_heap_destroy(&manager->queue);

	zbx_hashset_iter_reset(&manager->watchdog, &iter);
	while (NULL != (media = (zbx_am_media_t *)zbx_hashset_iter_next(&iter)))
	{
		zbx_free(media->sendto);
		zbx_hashset_iter_remove(&iter);
	}
	zbx_hashset_destroy(&manager->watchdog);

	zbx_hashset_destroy(&manager->results);
	zbx_hashset_destroy(&manager->alertpools);
	zbx_hashset_destroy(&manager->mediatypes);
}

/******************************************************************************
 *                                                                            *
 * Function: am_db_update_alert                                               *
 *                                                                            *
 * Purpose: update alert status in local cache to be flushed after reading    *
 *          new alerts from database                                          *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             alertid - [IN] the alert identifier                            *
 *             status  - [IN] the alert status                                *
 *             retries - [IN] the number of attempted sending retries         *
 *             error   - [IN] the error message                               *
 *                                                                            *
 ******************************************************************************/
static void	am_db_update_alert(zbx_am_t *manager, zbx_am_alert_t *alert, int status, int retries, const char *value,
		const char *error)
{
	zbx_am_result_t	*result;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() alertid:" ZBX_FS_UI64 " status:%d retries:%d value:%s error:%s", __func__,
			alert->alertid, status, retries, ZBX_NULL2EMPTY_STR(value), ZBX_NULL2EMPTY_STR(error));

	/* alerts with 0 alertid are runtime alerts generated by alert manager when database is down */
	if (0 != alert->alertid)
	{
		if (NULL == (result = (zbx_am_result_t *)zbx_hashset_search(&manager->results, &alert->alertid)))
		{
			zbx_am_result_t	update_local = {
					.alertid = alert->alertid,
					.eventid = alert->eventid,
					.mediatypeid = alert->mediatypeid,
					.source = ZBX_ALERTPOOL_SOURCE(alert->alertpoolid)
			};

			result = (zbx_am_result_t *)zbx_hashset_insert(&manager->results, &update_local,
					sizeof(update_local));
		}

		result->retries = retries;
		result->status = status;
		ZBX_UPDATE_STR(result->value, value);
		ZBX_UPDATE_STR(result->error, error);
	}

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_external_alert_send_response                                  *
 *                                                                            *
 * Purpose: send response to external alert request                           *
 *                                                                            *
 * Parameters: alerter_service - [IN] the IPC service                         *
 *             alert           - [IN] the alert                               *
 *             value           - [IN] the value                               *
 *             error           - [IN] error message                           *
 *             debug           - [IN] debug message                           *
 *                                                                            *
 ******************************************************************************/
static void	am_external_alert_send_response(const zbx_ipc_service_t *alerter_service, const zbx_am_alert_t *alert,
		const char *value, int errcode, const char *error, const char *debug)
{
	zbx_ipc_client_t	*client;

	if (NULL != (client = zbx_ipc_client_by_id(alerter_service, alert->alertid)))
	{
		unsigned char	*data;
		zbx_uint32_t	data_len;

		data_len = zbx_alerter_serialize_result_ext(&data, alert->sendto, value, errcode, error, debug);
		zbx_ipc_client_send(client, ZBX_IPC_ALERTER_SEND_ALERT, data, data_len);
		zbx_free(data);
	}
	else
		zabbix_log(LOG_LEVEL_DEBUG, "client has disconnected");
}

/******************************************************************************
 *                                                                            *
 * Function: am_sync_watchdog                                                 *
 *                                                                            *
 * Purpose: synchronize watchdog alert recipients                             *
 *                                                                            *
 * Parameters: manager    - [IN] the manager                                  *
 *             medias     - [IN] the new watchdog media list                  *
 *             medias_num - [IN] the number of watchdog medias                *
 *                                                                            *
 ******************************************************************************/
static void	am_sync_watchdog(zbx_am_t *manager, zbx_am_media_t **medias, int medias_num)
{
	int			i;
	zbx_hashset_t		mediaids;
	zbx_am_media_t		*media, media_local;
	zbx_hashset_iter_t	iter;
	zbx_vector_ptr_t	media_new;
	static int		old_count = -1;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() recipients:%d", __func__, medias_num);

	zbx_hashset_create(&mediaids, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
	zbx_vector_ptr_create(&media_new);

	for (i = 0; i < medias_num; i++)
	{
		if (NULL == (media = (zbx_am_media_t *)zbx_hashset_search(&manager->watchdog, &medias[i]->mediaid)))
		{
			media_local.mediaid = medias[i]->mediaid;
			media = (zbx_am_media_t *)zbx_hashset_insert(&manager->watchdog, &media_local,
					sizeof(media_local));
			media->sendto = NULL;
			zbx_vector_ptr_append(&media_new, media);
		}
		media->mediatypeid = medias[i]->mediatypeid;
		ZBX_UPDATE_STR(media->sendto,  medias[i]->sendto);
		zbx_hashset_insert(&mediaids, &media->mediaid, sizeof(media->mediaid));
	}

	/* drop removed watchdog alert recipients from cache */
	zbx_hashset_iter_reset(&manager->watchdog, &iter);
	while (NULL != (media = (zbx_am_media_t *)zbx_hashset_iter_next(&iter)))
	{
		if (NULL != zbx_hashset_search(&mediaids, &media->mediaid))
			continue;

		zbx_free(media->sendto);
		zbx_hashset_iter_remove(&iter);
	}

	zbx_vector_ptr_destroy(&media_new);
	zbx_hashset_destroy(&mediaids);

	if (0 < old_count && 0 == manager->watchdog.num_data)
	{
		zabbix_log(LOG_LEVEL_WARNING, "watchdog: no recipients found for database down messages");
	}
	else if (0 == old_count && 0 < manager->watchdog.num_data)
	{
		zabbix_log(LOG_LEVEL_WARNING, "watchdog: %d recipient(s) found for database down messages",
				manager->watchdog.num_data);
	}

	old_count = manager->watchdog.num_data;

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() recipients:%d", __func__, manager->watchdog.num_data);
}

/******************************************************************************
 *                                                                            *
 * Function: am_prepare_mediatype_exec_command                                *
 *                                                                            *
 * Purpose: gets script media type parameters with expanded macros            *
 *                                                                            *
 * Parameters: mediatype - [IN] the media type                                *
 *             alert     - [IN] the alert                                     *
 *             cmd       - [OUT] the command to execute                       *
 *             error     - [OUT] the error message                            *
 *                                                                            *
 * Return value: SUCCEED - the command was prepared successfully              *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	am_prepare_mediatype_exec_command(zbx_am_mediatype_t *mediatype, zbx_am_alert_t *alert, char **cmd,
		char **error)
{
	DB_ALERT	db_alert;
	size_t		cmd_alloc = ZBX_KIBIBYTE, cmd_offset = 0;
	int		ret = FAIL;

	*cmd = (char *)zbx_malloc(NULL, cmd_alloc);

	zbx_snprintf_alloc(cmd, &cmd_alloc, &cmd_offset, "%s/%s", CONFIG_ALERT_SCRIPTS_PATH, mediatype->exec_path);

	if (0 == access(*cmd, X_OK))
	{
		char	*pstart, *pend;

		db_alert.sendto = (NULL != alert->sendto ? alert->sendto : zbx_strdup(NULL, ""));
		db_alert.subject = (NULL != alert->subject ? alert->subject : zbx_strdup(NULL, ""));
		db_alert.message = (NULL != alert->message ? alert->message : zbx_strdup(NULL, ""));

		for (pstart = mediatype->exec_params; NULL != (pend = strchr(pstart, '\n')); pstart = pend + 1)
		{
			char	*param_esc, *param = NULL;
			size_t	param_alloc = 0, param_offset = 0;

			zbx_strncpy_alloc(&param, &param_alloc, &param_offset, pstart, pend - pstart);

			substitute_simple_macros_unmasked(NULL, NULL, NULL, NULL, NULL, NULL, NULL, &db_alert, NULL,
					NULL, &param, MACRO_TYPE_ALERT, NULL, 0);

			param_esc = zbx_dyn_escape_shell_single_quote(param);
			zbx_snprintf_alloc(cmd, &cmd_alloc, &cmd_offset, " '%s'", param_esc);

			zbx_free(param_esc);
			zbx_free(param);
		}

		if (db_alert.sendto != alert->sendto)
			zbx_free(db_alert.sendto);
		if (db_alert.subject != alert->subject)
			zbx_free(db_alert.subject);
		if (db_alert.message != alert->message)
			zbx_free(db_alert.message);

		ret = SUCCEED;
	}
	else
	{
		*error = zbx_dsprintf(*error, "Cannot execute command \"%s\": %s", *cmd, zbx_strerror(errno));
		zbx_free(*cmd);
	}

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_alert                                                 *
 *                                                                            *
 * Purpose: sends alert to the alerter                                        *
 *                                                                            *
 * Parameters: manager         - [IN] the alert manager                       *
 *             alerter         - [IN] the target alerter                      *
 *             alert           - [IN] the alert to send                       *
 *                                                                            *
 * Return value: SUCCEED - the alert was successfully sent to alerter         *
 *               FAIL    - otherwise                                          *
 *                                                                            *
 ******************************************************************************/
static int	am_process_alert(zbx_am_t *manager, zbx_am_alerter_t *alerter, zbx_am_alert_t *alert)
{
	zbx_am_mediatype_t	*mediatype;
	unsigned char		*data = NULL, debug;
	size_t			data_len;
	zbx_uint64_t		command, p_eventid;
	char			*cmd = NULL, *error = NULL;
	int			ret = FAIL;
	unsigned char		content_type;

	zabbix_log(LOG_LEVEL_DEBUG, "%s() alertid:" ZBX_FS_UI64 " mediatypeid:" ZBX_FS_UI64 " alertpoolid:0x"
			ZBX_FS_UX64, __func__, alert->alertid, alert->mediatypeid, alert->alertpoolid);

	if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid)))
	{
		am_alert_free(alert);
		goto out;
	}

	if (NULL != mediatype->error)
	{
		if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid))
			am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, mediatype->error, NULL);
		else
			am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, mediatype->error);

		am_remove_alert(manager, alert);
		goto out;
	}

	switch (mediatype->type)
	{
		case MEDIA_TYPE_EMAIL:
			command = ZBX_IPC_ALERTER_EMAIL;
			p_eventid = (0 == alert->p_eventid ? alert->eventid : alert->p_eventid);

			if (ZBX_MEDIA_CONTENT_TYPE_DEFAULT == (content_type = alert->content_type))
				content_type = mediatype->content_type;

			data_len = zbx_alerter_serialize_email(&data, alert->alertid, alert->mediatypeid,
					p_eventid, alert->sendto, alert->subject, alert->message,
					mediatype->smtp_server, mediatype->smtp_port, mediatype->smtp_helo,
					mediatype->smtp_email, mediatype->smtp_security, mediatype->smtp_verify_peer,
					mediatype->smtp_verify_host, mediatype->smtp_authentication,
					mediatype->username, mediatype->passwd, content_type);
			break;
		case MEDIA_TYPE_SMS:
			command = ZBX_IPC_ALERTER_SMS;
			data_len = zbx_alerter_serialize_sms(&data, alert->alertid, alert->sendto, alert->message,
					mediatype->gsm_modem);
			break;
		case MEDIA_TYPE_EXEC:
			command = ZBX_IPC_ALERTER_EXEC;
			if (FAIL == am_prepare_mediatype_exec_command(mediatype, alert, &cmd, &error))
			{
				if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid))
					am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, error, NULL);
				else
					am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, error);

				am_remove_alert(manager, alert);
				zbx_free(error);
				goto out;
			}
			data_len = zbx_alerter_serialize_exec(&data, alert->alertid, cmd);
			zbx_free(cmd);
			break;
		case MEDIA_TYPE_WEBHOOK:
			command = ZBX_IPC_ALERTER_WEBHOOK;
			if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid))
				debug = ZBX_ALERT_DEBUG;
			else
				debug = ZBX_ALERT_NO_DEBUG;

			data_len = zbx_alerter_serialize_webhook(&data, mediatype->script_bin, mediatype->script_bin_sz,
					mediatype->timeout, alert->params, debug);
			break;
		default:
			error = "unsupported media type";
			if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alert->alertpoolid))
				am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, error, NULL);
			else
				am_db_update_alert(manager, alert, ALERT_STATUS_FAILED, 0, NULL, error);

			zabbix_log(LOG_LEVEL_ERR, "cannot process alertid:" ZBX_FS_UI64 ": unsupported media type: %d",
					alert->alertid, mediatype->type);
			am_remove_alert(manager, alert);
			goto out;
	}

	alerter->alert = alert;
	zbx_ipc_client_send(alerter->client, command, data, data_len);
	zbx_free(data);

	ret = SUCCEED;
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_result                                                *
 *                                                                            *
 * Purpose: process alerter result                                            *
 *                                                                            *
 * Parameters: manager         - [IN] the manager                             *
 *             client          - [IN] the connected alerter                   *
 *             message         - [IN] the received message                    *
 *                                                                            *
 * Return value: SUCCEED - the alert was sent successfully                    *
 *               FAIL - otherwise                                             *
 *                                                                            *
 ******************************************************************************/
static int	am_process_result(zbx_am_t *manager, zbx_ipc_client_t *client, zbx_ipc_message_t *message)
{
	int			ret = FAIL, status;
	zbx_am_alerter_t	*alerter;
	char			*value, *errmsg, *debug;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	if (NULL == (alerter = am_get_alerter_by_client(manager, client)))
	{
		THIS_SHOULD_NEVER_HAPPEN;
		goto out;
	}

	if (NULL == alerter->alert)
	{
		THIS_SHOULD_NEVER_HAPPEN;
		goto out;
	}

	zabbix_log(LOG_LEVEL_DEBUG, "%s() alertid:" ZBX_FS_UI64 " mediatypeid:" ZBX_FS_UI64 " alertpoolid:0x"
			ZBX_FS_UX64, __func__, alerter->alert->alertid, alerter->alert->mediatypeid,
			alerter->alert->alertpoolid);

	zbx_alerter_deserialize_result(message->data, &value, &ret, &errmsg, &debug);

	if (ALERT_SOURCE_EXTERNAL == ZBX_ALERTPOOL_SOURCE(alerter->alert->alertpoolid))
	{
		am_external_alert_send_response(&manager->ipc, alerter->alert, value, ret, errmsg, debug);
		am_remove_alert(manager, alerter->alert);
	}
	else
	{
		if (SUCCEED == ret)
		{
			status = ALERT_STATUS_SENT;
		}
		else
		{
			if (SUCCEED == am_retry_alert(manager, alerter->alert))
				status = ALERT_STATUS_NOT_SENT;
			else
				status = ALERT_STATUS_FAILED;
		}

		am_db_update_alert(manager, alerter->alert, status, alerter->alert->retries, value, errmsg);

		if (ALERT_STATUS_NOT_SENT != status)
			am_remove_alert(manager, alerter->alert);
	}

	zbx_free(value);
	zbx_free(errmsg);
	zbx_free(debug);
	alerter->alert = NULL;

	zbx_queue_ptr_push(&manager->free_alerters, alerter);
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);

	return ret;
}

/******************************************************************************
 *                                                                            *
 * Function: am_check_queue                                                   *
 *                                                                            *
 * Purpose: checks alert queue if there is an alert that should be sent now   *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *             now     - [IN] the current timestamp                           *
 *                                                                            *
 * Return value: SUCCEED - an alert can be sent                               *
 *               FAIL - there are no alerts to be sent at this time           *
 *                                                                            *
 ******************************************************************************/
static int	am_check_queue(zbx_am_t *manager, int now)
{
	zbx_binary_heap_elem_t	*elem;
	zbx_am_mediatype_t	*mediatype;
	zbx_am_alertpool_t	*alertpool;
	zbx_am_alert_t		*alert;

	if (SUCCEED == zbx_binary_heap_empty(&manager->queue))
		return FAIL;

	elem = zbx_binary_heap_find_min(&manager->queue);
	mediatype = (zbx_am_mediatype_t *)elem->data;

	if (SUCCEED == zbx_binary_heap_empty(&mediatype->queue))
		return FAIL;

	elem = zbx_binary_heap_find_min(&mediatype->queue);
	alertpool = (zbx_am_alertpool_t *)elem->data;

	if (SUCCEED == zbx_binary_heap_empty(&alertpool->queue))
		return FAIL;

	elem = zbx_binary_heap_find_min(&alertpool->queue);
	alert = (zbx_am_alert_t *)elem->data;

	if (alert->nextsend > now)
		return FAIL;

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Function: am_update_mediatypes                                             *
 *                                                                            *
 * Purpose: update cached media types                                         *
 *                                                                            *
 ******************************************************************************/
static void	am_update_mediatypes(zbx_am_t *manager, zbx_ipc_message_t *message)
{
	zbx_am_db_mediatype_t	**mediatypes;
	int			mediatypes_num, i;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_alerter_deserialize_mediatypes(message->data, &mediatypes, &mediatypes_num);

	zabbix_log(LOG_LEVEL_DEBUG, "update %d media types", mediatypes_num);

	for (i = 0; i < mediatypes_num; i++)
	{
		zbx_am_db_mediatype_t	*mt = mediatypes[i];

		am_update_mediatype(manager, mt->mediatypeid, mt->type, mt->smtp_server, mt->smtp_helo, mt->smtp_email,
				mt->exec_path, mt->gsm_modem, mt->username, mt->passwd, mt->smtp_port, mt->smtp_security,
				mt->smtp_verify_peer, mt->smtp_verify_host, mt->smtp_authentication, mt->exec_params,
				mt->maxsessions, mt->maxattempts, mt->attempt_interval, mt->content_type,
				mt->script, mt->timeout, ZBX_AM_MEDIATYPE_FLAG_NONE);

		zbx_am_db_mediatype_clear(mt);
		zbx_free(mt);
	}
	zbx_free(mediatypes);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_queue_alert                                                   *
 *                                                                            *
 * Purpose: queue new alerts                                                  *
 *                                                                            *
 ******************************************************************************/
static int	am_queue_alert(zbx_am_t *manager, zbx_am_alert_t *alert, int now)
{
	zbx_am_mediatype_t	*mediatype;
	zbx_am_alertpool_t	*alertpool;

	alert->nextsend = now;

	if (NULL == (mediatype = am_get_mediatype(manager, alert->mediatypeid)))
		return FAIL;

	alertpool = am_get_alertpool(manager, alert->mediatypeid, alert->alertpoolid);

	alertpool->refcount++;
	mediatype->refcount++;

	am_push_alert(alertpool, alert);
	am_push_alertpool(mediatype, alertpool);
	am_push_mediatype(manager, mediatype);

	manager->alerts_num++;

	return SUCCEED;
}

/******************************************************************************
 *                                                                            *
 * Function: am_queue_alerts                                                  *
 *                                                                            *
 * Purpose: queue new alerts                                                  *
 *                                                                            *
 ******************************************************************************/
static void	am_queue_alerts(zbx_am_t *manager, zbx_ipc_message_t *message, int now)
{
	zbx_am_db_alert_t	**alerts;
	int			alerts_num, i;
	zbx_am_alert_t		*alert;

	now = time(NULL);
	zbx_alerter_deserialize_alerts(message->data, &alerts, &alerts_num);

	for (i = 0; i < alerts_num; i++)
	{
		alert = am_copy_db_alert(alerts[i]);
		if (FAIL == am_queue_alert(manager, alert, now))
		{
			am_alert_free(alert);
			continue;
		}
	}

	zbx_free(alerts);
}

/******************************************************************************
 *                                                                            *
 * Function: am_update_watchdog                                               *
 *                                                                            *
 * Purpose: update 'database down' watchdog alert recipients                  *
 *                                                                            *
 ******************************************************************************/
static void	am_update_watchdog(zbx_am_t *manager, zbx_ipc_message_t *message)
{
	zbx_am_media_t	**medias;
	int		medias_num, i;

	zbx_alerter_deserialize_medias(message->data, &medias, &medias_num);
	am_sync_watchdog(manager, medias, medias_num);

	for (i = 0; i < medias_num; i++)
		zbx_am_media_free(medias[i]);
	zbx_free(medias);
}

/******************************************************************************
 *                                                                            *
 * Function: am_drop_mediatypes                                               *
 *                                                                            *
 * Purpose: remove unused mediatypes                                          *
 *                                                                            *
 ******************************************************************************/
static void	am_drop_mediatypes(zbx_am_t *manager, zbx_ipc_message_t *message)
{
	zbx_uint64_t		*ids;
	int			ids_num, i;
	zbx_am_mediatype_t	*mediatype;

	zbx_alerter_deserialize_ids(message->data, &ids, &ids_num);

	for (i = 0; i < ids_num; i++)
	{
		if (NULL == (mediatype = (zbx_am_mediatype_t *)zbx_hashset_search(&manager->mediatypes, &ids[i])))
			continue;

		if (0 == mediatype->refcount)
			am_remove_mediatype(manager, mediatype);
		else
			mediatype->flags = ZBX_AM_MEDIATYPE_FLAG_REMOVE;
	}

	zbx_free(ids);
}

/******************************************************************************
 *                                                                            *
 * Function: am_flush_results                                                 *
 *                                                                            *
 * Purpose: returns alert sending results                                     *
 *                                                                            *
 ******************************************************************************/
static void	am_flush_results(zbx_am_t *manager, zbx_ipc_client_t *client)
{
	zbx_vector_ptr_t	results;
	zbx_hashset_iter_t	iter;
	zbx_am_result_t		*result;
	zbx_uint32_t		data_len;
	unsigned char		*data;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() results:%d", __func__, manager->results.num_data);

	if (0 == manager->results.num_data)
	{
		int		results_num = 0;
		unsigned char	buf[sizeof(results_num)];

		(void)zbx_serialize_value(buf, results_num);
		zbx_ipc_client_send(client, ZBX_IPC_ALERTER_RESULTS, buf, sizeof(results_num));
		goto out;
	}

	zbx_vector_ptr_create(&results);

	zbx_hashset_iter_reset(&manager->results, &iter);
	while (NULL != (result = (zbx_am_result_t *)zbx_hashset_iter_next(&iter)))
		zbx_vector_ptr_append(&results, result);

	zbx_vector_ptr_sort(&results, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);

	data_len = zbx_alerter_serialize_results(&data, (zbx_am_result_t **)results.values, results.values_num);
	zbx_ipc_client_send(client, ZBX_IPC_ALERTER_RESULTS, data, data_len);
	zbx_free(data);

	zbx_hashset_iter_reset(&manager->results, &iter);
	while (NULL != (result = (zbx_am_result_t *)zbx_hashset_iter_next(&iter)))
	{
		zbx_free(result->value);
		zbx_free(result->error);
	}

	zbx_hashset_clear(&manager->results);
	zbx_vector_ptr_destroy(&results);

out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_external_alert_request                                *
 *                                                                            *
 * Purpose: process external alert request                                    *
 *                                                                            *
 * Parameters: manager - [IN] the alert manager                               *
 *             id      - [IN] client id that sent external alert request      *
 *             data    - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_process_external_alert_request(zbx_am_t *manager, zbx_uint64_t id, const unsigned char *data)
{
	zbx_uint64_t	mediatypeid;
	char		*sendto, *subject, *message, *params, *smtp_server, *smtp_helo, *smtp_email, *exec_path,
			*gsm_modem, *username, *passwd, *exec_params, *attempt_interval, *script, *timeout;
	unsigned short	smtp_port;
	int		maxsessions, maxattempts;
	unsigned char	type, smtp_security, smtp_verify_peer, smtp_verify_host, smtp_authentication, content_type;

	zbx_am_alert_t		*alert;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_alerter_deserialize_alert_send(data, &mediatypeid, &type, &smtp_server, &smtp_helo, &smtp_email, &exec_path,
			&gsm_modem, &username, &passwd, &smtp_port, &smtp_security, &smtp_verify_peer,
			&smtp_verify_host, &smtp_authentication, &exec_params, &maxsessions, &maxattempts,
			&attempt_interval, &content_type, &script, &timeout, &sendto, &subject, &message,
			&params);

	/* update with initial 'remove' flag so the mediatype is removed if it's not used by other alerts */
	am_update_mediatype(manager, mediatypeid, type, smtp_server, smtp_helo, smtp_email, exec_path,
			gsm_modem, username, passwd, smtp_port, smtp_security, smtp_verify_peer,
			smtp_verify_host, smtp_authentication, exec_params, maxsessions, maxattempts,
			attempt_interval, content_type, script, timeout, ZBX_AM_MEDIATYPE_FLAG_REMOVE);

	alert = am_create_alert(id, mediatypeid, ALERT_SOURCE_EXTERNAL, 0, id, sendto, subject, shared_str_new(message),
			params, content_type, 0, 0, 0);

	if (FAIL == am_queue_alert(manager, alert, 0))
	{
		am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, "Media type unavailable", NULL);
		am_alert_free(alert);
	}

	zbx_free(params);
	zbx_free(smtp_server);
	zbx_free(smtp_helo);
	zbx_free(smtp_email);
	zbx_free(exec_path);
	zbx_free(gsm_modem);
	zbx_free(username);
	zbx_free(passwd);
	zbx_free(exec_params);
	zbx_free(attempt_interval);
	zbx_free(script);
	zbx_free(timeout);
	zbx_free(message);
	zbx_free(subject);
	zbx_free(sendto);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_begin_dispatch                                        *
 *                                                                            *
 * Purpose: begin file dispatch                                               *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client data            *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_process_begin_dispatch(zbx_ipc_client_t *client, const unsigned char *data)
{
	zbx_am_dispatch_t *dispatch;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client));

	dispatch = (zbx_am_dispatch_t*) zbx_malloc(NULL, sizeof(zbx_am_dispatch_t));
	zbx_alerter_deserialize_begin_dispatch(data, &dispatch->subject, &dispatch->message, &dispatch->content_name,
			&dispatch->content_type, &dispatch->content, &dispatch->content_size);

	zbx_ipc_client_set_userdata(client, dispatch);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() name:%s content_type:%s size:%u", __func__, dispatch->content_name,
			dispatch->content_type, dispatch->content_size);
}

/******************************************************************************
 *                                                                            *
 * Function: am_prepare_dispatch_message                                      *
 *                                                                            *
 * Purpose: prepare message to dispatch by attaching dispatch contents for    *
 *          supported media types                                             *
 *                                                                            *
 * Parameters: dispatch     - [IN] the dispatch data                          *
 *             mt           - [IN] the media type                             *
 *             message      - [OUT] the message to send                       *
 *             content_type - [OUT] the message content type                  *
 *                                                                            *
 ******************************************************************************/
static void	am_prepare_dispatch_message(zbx_am_dispatch_t *dispatch, DB_MEDIATYPE *mt, zbx_shared_str_t *message,
		unsigned char *content_type)
{
	char	*body = NULL;

	if (0 != dispatch->content_size)
	{
		if (MEDIA_TYPE_EMAIL == mt->type)
		{
			body = zbx_email_make_body(dispatch->message, mt->content_type, dispatch->content_name,
					dispatch->content_type, dispatch->content, dispatch->content_size);
			*content_type = ZBX_MEDIA_CONTENT_TYPE_MULTI;
		}
	}

	if (NULL == body)
	{
		*message = shared_str_new(dispatch->message);
		*content_type = mt->content_type;
	}
	else
	{
		*message = shared_str_new(body);
		zbx_free(body);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_send_dispatch                                         *
 *                                                                            *
 * Purpose: send dispatch to the specified media type users                   *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client                 *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_process_send_dispatch(zbx_am_t *manager, zbx_ipc_client_t *client, const unsigned char *data)
{
	int			i;
	zbx_vector_str_t	recipients;
	zbx_am_alert_t		*alert;
	DB_MEDIATYPE		mt;
	zbx_shared_str_t	message;
	unsigned char		content_type;
	zbx_uint64_t		id;
	zbx_am_dispatch_t	*dispatch;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client));

	if (NULL == (dispatch = (zbx_am_dispatch_t *)zbx_ipc_client_get_userdata(client)))
	{
		THIS_SHOULD_NEVER_HAPPEN;

		zbx_ipc_client_send(client, ZBX_IPC_ALERTER_ABORT_DISPATCH, NULL, 0);
		goto out;
	}

	id = zbx_ipc_client_id(client);

	zbx_vector_str_create(&recipients);

	zbx_alerter_deserialize_send_dispatch(data, &mt, &recipients);

	/* update with initial 'remove' flag so the mediatype is removed */
	/* if it's not used by other test alerts/dispatches              */
	am_update_mediatype(manager, mt.mediatypeid, mt.type, mt.smtp_server, mt.smtp_helo, mt.smtp_email, mt.exec_path,
			mt.gsm_modem, mt.username, mt.passwd, mt.smtp_port, mt.smtp_security, mt.smtp_verify_peer,
			mt.smtp_verify_host, mt.smtp_authentication, mt.exec_params, mt.maxsessions, mt.maxattempts,
			mt.attempt_interval, mt.content_type, mt.script, mt.timeout, ZBX_AM_MEDIATYPE_FLAG_REMOVE);

	am_prepare_dispatch_message(dispatch, &mt, &message, &content_type);

	for (i = 0; i < recipients.values_num; i++)
	{
		alert = am_create_alert(id, mt.mediatypeid, ALERT_SOURCE_EXTERNAL, 0, id, recipients.values[i],
				dispatch->subject, message, NULL, content_type, 0, 0, 0);

		if (FAIL == am_queue_alert(manager, alert, 0))
		{
			am_external_alert_send_response(&manager->ipc, alert, NULL, FAIL, "Media type unavailable",
					NULL);
			am_alert_free(alert);
		}
	}

	zbx_db_mediatype_clean(&mt);

	zbx_vector_str_clear_ext(&recipients, zbx_str_free);
	zbx_vector_str_destroy(&recipients);
out:
	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_end_dispatch                                          *
 *                                                                            *
 * Purpose: finish sending dispatches                                         *
 *                                                                            *
 * Parameters: client  - [IN] the connected worker IPC client                 *
 *                                                                            *
 ******************************************************************************/
static void	am_process_end_dispatch(zbx_ipc_client_t *client)
{
	zbx_am_dispatch_t	*dispatch;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s() clientid:" ZBX_FS_UI64, __func__, zbx_ipc_client_id(client));

	dispatch = (zbx_am_dispatch_t *)zbx_ipc_client_get_userdata(client);
	zbx_ipc_client_set_userdata(client, NULL);
	am_dispatch_free(dispatch);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/******************************************************************************
 *                                                                            *
 * Function: am_remove_unused_mediatypes                                      *
 *                                                                            *
 * Purpose: remove unused media types                                         *
 *                                                                            *
 ******************************************************************************/
static void	am_remove_unused_mediatypes(zbx_am_t *manager)
{
	zbx_hashset_iter_t	iter;
	zbx_am_mediatype_t	*mediatype;

	zbx_hashset_iter_reset(&manager->mediatypes, &iter);
	while (NULL != (mediatype = (zbx_am_mediatype_t *)zbx_hashset_iter_next(&iter)))
	{
		if (0 != (mediatype->flags & ZBX_AM_MEDIATYPE_FLAG_REMOVE) && 0 == mediatype->refcount)
			am_remove_mediatype(manager, mediatype);
	}
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_diag_stats                                            *
 *                                                                            *
 * Purpose: process diagnostic statistics request                             *
 *                                                                            *
 ******************************************************************************/
static void	am_process_diag_stats(zbx_am_t *manager, zbx_ipc_client_t *client)
{
	unsigned char	*data;
	zbx_uint32_t	data_len;

	data_len = zbx_alerter_serialize_diag_stats(&data, manager->alerts_num);
	zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_STATS_RESULT, data, data_len);
	zbx_free(data);
}

/******************************************************************************
 *                                                                            *
 * Function: am_compare_mediatype_by_alerts_desc                              *
 *                                                                            *
 * Purpose: compare mediatypes by total queued alerts                         *
 *                                                                            *
 ******************************************************************************/
static int	am_compare_mediatype_by_alerts_desc(const void *d1, const void *d2)
{
	zbx_am_mediatype_t	*m1 = *(zbx_am_mediatype_t **)d1;
	zbx_am_mediatype_t	*m2 = *(zbx_am_mediatype_t **)d2;

	return m2->refcount - m1->refcount;
}

/******************************************************************************
 *                                                                            *
 * Function: am_process_diag_top_mediatypes                                   *
 *                                                                            *
 * Purpose: processes top mediatypes by queued alerts                         *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client data            *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_process_diag_top_mediatypes(zbx_am_t *manager, zbx_ipc_client_t *client,
		const zbx_ipc_message_t *message)
{
	int			limit;
	unsigned char		*data;
	zbx_uint32_t		data_len;

	zbx_vector_ptr_t	view;
	zbx_hashset_iter_t	iter;
	zbx_am_mediatype_t	*mediatype;
	int			mediatypes_num;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_alerter_deserialize_top_request(message->data, &limit);

	zbx_vector_ptr_create(&view);

	zbx_hashset_iter_reset(&manager->mediatypes, &iter);
	while (NULL != (mediatype = (zbx_am_mediatype_t *)zbx_hashset_iter_next(&iter)))
	{
		if (0 != mediatype->refcount)
			zbx_vector_ptr_append(&view, mediatype);
	}

	zbx_vector_ptr_sort(&view, am_compare_mediatype_by_alerts_desc);
	mediatypes_num = MIN(limit, view.values_num);

	data_len = zbx_alerter_serialize_top_mediatypes_result(&data, (zbx_am_mediatype_t **)view.values, mediatypes_num);
	zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_TOP_MEDIATYPES_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

/* alert source hashset support */

static zbx_hash_t	am_source_hash_func(const void *data)
{
	const zbx_am_source_stats_t	*source = (const zbx_am_source_stats_t *)data;

	zbx_hash_t			hash;
	zbx_uint64_t			source_object;

	source_object = (zbx_uint64_t)source->source << 32 | source->object;
	hash = ZBX_DEFAULT_UINT64_HASH_FUNC(&source_object);
	hash = ZBX_DEFAULT_UINT64_HASH_ALGO(&source->objectid, sizeof(source->objectid), hash);

	return hash;
}

static int	am_source_compare_func(const void *d1, const void *d2)
{
	const zbx_am_source_stats_t	*s1 = (const zbx_am_source_stats_t *)d1;
	const zbx_am_source_stats_t	*s2 = (const zbx_am_source_stats_t *)d2;

	ZBX_RETURN_IF_NOT_EQUAL(s1->source, s2->source);
	ZBX_RETURN_IF_NOT_EQUAL(s1->object, s2->object);
	ZBX_RETURN_IF_NOT_EQUAL(s1->objectid, s2->objectid);

	return 0;
}
/******************************************************************************
 *                                                                            *
 * Function: am_process_diag_top_sources                                      *
 *                                                                            *
 * Purpose: processes top alert sources by queued alerts                      *
 *                                                                            *
 * Parameters: manager - [IN] the manager                                     *
 *             client  - [IN] the connected worker IPC client data            *
 *             message - [IN] the received message                            *
 *                                                                            *
 ******************************************************************************/
static void	am_process_diag_top_sources(zbx_am_t *manager, zbx_ipc_client_t *client,
		const zbx_ipc_message_t *message)
{
	int			limit;
	unsigned char		*data;
	zbx_uint32_t		data_len;

	zbx_vector_ptr_t	view;
	zbx_am_alertpool_t	*alertpool;
	zbx_am_alert_t		*alert;
	int			i, sources_num;
	zbx_hashset_t		sources;
	zbx_hashset_iter_t	iter;

	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);

	zbx_alerter_deserialize_top_request(message->data, &limit);

	zbx_hashset_create(&sources, 1024, am_source_hash_func, am_source_compare_func);
	zbx_vector_ptr_create(&view);

	zbx_hashset_iter_reset(&manager->alertpools, &iter);
	while (NULL != (alertpool = (zbx_am_alertpool_t *)zbx_hashset_iter_next(&iter)))
	{
		for (i = 0; i < alertpool->queue.elems_num; i++)
		{
			zbx_am_source_stats_t	*source, source_local;

			alert = (zbx_am_alert_t *)alertpool->queue.elems[i].data;
			source_local.source = ZBX_ALERTPOOL_SOURCE(alert->alertpoolid);
			source_local.object = ZBX_ALERTPOOL_OBJECT(alert->alertpoolid);
			source_local.objectid = alert->objectid;

			if (NULL == (source = zbx_hashset_search(&sources, &source_local)))
			{
				source = zbx_hashset_insert(&sources, &source_local, sizeof(source_local));
				source->alerts_num = 0;
				zbx_vector_ptr_append(&view, source);
			}
			source->alerts_num++;
		}
	}

	zbx_vector_ptr_sort(&view, am_compare_mediatype_by_alerts_desc);
	sources_num = MIN(limit, view.values_num);

	data_len = zbx_alerter_serialize_top_sources_result(&data, (zbx_am_source_stats_t **)view.values, sources_num);
	zbx_ipc_client_send(client, ZBX_IPC_ALERTER_DIAG_TOP_SOURCES_RESULT, data, data_len);
	zbx_free(data);

	zbx_vector_ptr_destroy(&view);
	zbx_hashset_destroy(&sources);

	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
}

ZBX_THREAD_ENTRY(alert_manager_thread, args)
{
#define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
				/* once in STAT_INTERVAL seconds */

	zbx_am_t		manager;
	char			*error = NULL;
	zbx_ipc_client_t	*client;
	zbx_ipc_message_t	*message;
	zbx_am_alerter_t	*alerter;
	int			ret, sent_num = 0, failed_num = 0, now, time_watchdog = 0, time_ping = 0,
				time_mediatype = 0;
	double			time_stat, time_idle = 0, time_now, sec;

	process_type = ((zbx_thread_args_t *)args)->process_type;
	server_num = ((zbx_thread_args_t *)args)->server_num;
	process_num = ((zbx_thread_args_t *)args)->process_num;

	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);

	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
			server_num, get_process_type_string(process_type), process_num);

	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

	if (FAIL == am_init(&manager, &error))
	{
		zabbix_log(LOG_LEVEL_CRIT, "cannot initialize alert manager: %s", error);
		zbx_free(error);
		exit(EXIT_FAILURE);
	}

	manager.dbstatus = ZBX_DB_OK;

	/* initialize statistics */
	time_stat = zbx_time();

	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);

	while (ZBX_IS_RUNNING())
	{
		time_now = zbx_time();
		now = time_now;

		if (time_ping + ZBX_DB_PING_FREQUENCY < now)
		{
			manager.dbstatus = DBconnect(ZBX_DB_CONNECT_ONCE);
			DBclose();
			time_ping = now;
		}
		if (ZBX_DB_DOWN == manager.dbstatus)
		{
			if (0 == time_watchdog)
				zabbix_log(LOG_LEVEL_ERR, "database connection lost");

			if (time_watchdog + ZBX_WATCHDOG_ALERT_FREQUENCY <= now)
			{
				am_queue_watchdog_alerts(&manager);
				time_watchdog = now;
			}
		}
		else if (0 != time_watchdog)
		{
			zabbix_log(LOG_LEVEL_ERR, "database connection re-established");
			time_watchdog = 0;
		}

		if (STAT_INTERVAL < time_now - time_stat)
		{
			zbx_setproctitle("%s #%d [sent %d, failed %d alerts, idle " ZBX_FS_DBL " sec during "
					ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num,
					sent_num, failed_num, time_idle, time_now - time_stat);

			time_stat = time_now;
			time_idle = 0;
			sent_num = 0;
			failed_num = 0;
		}

		now = time(NULL);

		while (SUCCEED == am_check_queue(&manager, now))
		{
			if (NULL == (alerter = (zbx_am_alerter_t *)zbx_queue_ptr_pop(&manager.free_alerters)))
				break;

			if (FAIL == am_process_alert(&manager, alerter, am_pop_alert(&manager)))
				zbx_queue_ptr_push(&manager.free_alerters, alerter);
		}

		if (time_mediatype + ZBX_AM_MEDIATYPE_CLEANUP_FREQUENCY < now)
		{
			am_remove_unused_mediatypes(&manager);
			time_mediatype = now;
		}

		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
		ret = zbx_ipc_service_recv(&manager.ipc, 1, &client, &message);
		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);

		sec = zbx_time();
		zbx_update_env(sec);

		if (ZBX_IPC_RECV_IMMEDIATE != ret)
			time_idle += sec - time_now;

		if (NULL != message)
		{
			switch (message->code)
			{
				case ZBX_IPC_ALERTER_REGISTER:
					am_register_alerter(&manager, client, message);
					break;
				case ZBX_IPC_ALERTER_RESULT:
					if (SUCCEED == am_process_result(&manager, client, message))
						sent_num++;
					else
						failed_num++;
					break;
				case ZBX_IPC_ALERTER_SEND_ALERT:
					am_process_external_alert_request(&manager, zbx_ipc_client_id(client),
							message->data);
					break;
				case ZBX_IPC_ALERTER_MEDIATYPES:
					am_update_mediatypes(&manager, message);
					break;
				case ZBX_IPC_ALERTER_ALERTS:
					am_queue_alerts(&manager, message, now);
					break;
				case ZBX_IPC_ALERTER_WATCHDOG:
					am_update_watchdog(&manager, message);
					break;
				case ZBX_IPC_ALERTER_RESULTS:
					am_flush_results(&manager, client);
					break;
				case ZBX_IPC_ALERTER_DROP_MEDIATYPES:
					am_drop_mediatypes(&manager, message);
					break;
				case ZBX_IPC_ALERTER_DIAG_STATS:
					am_process_diag_stats(&manager, client);
					break;
				case ZBX_IPC_ALERTER_DIAG_TOP_MEDIATYPES:
					am_process_diag_top_mediatypes(&manager, client, message);
					break;
				case ZBX_IPC_ALERTER_DIAG_TOP_SOURCES:
					am_process_diag_top_sources(&manager, client, message);
					break;
				case ZBX_IPC_ALERTER_BEGIN_DISPATCH:
					am_process_begin_dispatch(client, message->data);
					break;
				case ZBX_IPC_ALERTER_SEND_DISPATCH:
					am_process_send_dispatch(&manager, client, message->data);
					break;
				case ZBX_IPC_ALERTER_END_DISPATCH:
					am_process_end_dispatch(client);
					break;
			}

			zbx_ipc_message_free(message);
		}

		if (NULL != client)
			zbx_ipc_client_release(client);
	}

	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);

	while (1)
		zbx_sleep(SEC_PER_MIN);

	zbx_ipc_service_close(&manager.ipc);
	am_destroy(&manager);
}
